package org.budo.warehouse.logic.producer.http;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.Resource;

import org.budo.support.http.client.Request;
import org.budo.support.http.client.Response;
import org.budo.support.java.concurrent.thread.factory.BudoThreadFactory;
import org.budo.support.lang.util.ListUtil;
import org.budo.support.lang.util.ThreadUtil;
import org.budo.support.servlet.util.QueryStringUtil;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataEntryPojo;
import org.budo.warehouse.logic.api.DataEntryPojo.Column;
import org.budo.warehouse.logic.api.DataEntryPojo.Row;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.DataMessagePojo;
import org.budo.warehouse.logic.util.DataEntryUtil;
import org.budo.warehouse.service.api.IEntryBufferService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.EntryBuffer;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.beans.factory.InitializingBean;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * TODO 做登陆,下文件
 * 
 * @author lmw
 */
@Getter
@Setter
@Slf4j
public class LakalaCrawlerDataProducer implements InitializingBean, Runnable {
    private static final String PARAMS = "hidBeginTime=2019-06-01+00%3A00%3A00&hidEndTime=&pageth=1&excel=0&sort=&desc=0&beginTime=2019-06-01+00%3A00%3A00&endTime=&caller=&inbound=0&disposition=&callee=&client=0&mixmin=&maxmin=&local=0&asd=1&jp=1";

    private DataNode dataNode;

    @Resource(name = "dispatcherDataConsumer")
    private DataConsumer dataConsumer;

    @Resource
    private IEntryBufferService entryBufferService;

    @Override
    public void afterPropertiesSet() throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor(new BudoThreadFactory("LakalaCrawlerDataProducer"));
        executorService.submit(this); // 启动任务
    }

    @Override
    public void run() {
        while (true) {
            this.crawler();
        }
    }

    private void crawler() {
        try {
            this.doCrawler();

            // 间隔时间
            Integer interval = QueryStringUtil.getParameterInteger(this.getDataNode().getUrl(), "interval", 10);
            ThreadUtil.sleep(1000 * interval);
        } catch (Throwable e) {
            log.error("#40 doCrawler error, e=" + e, e);
        }
    }

    private void doCrawler() {
        String password = this.getDataNode().getPassword(); // PHPSESSID
        String url = this.getDataNode().getUrl().substring(7); // lakala:http://61.155.215.234:8081/call400_admin/meetme_control.php?s=1&t=4

        Request request = new Request();
        request.setMethod(Request.POST);
        request.setUrl(url);
        request.setCookies("PHPSESSID=" + password);
        request.setRequestBody(PARAMS);

        Response response = request.execute();

        String responseBody = response.getBody();
        if (responseBody.contains("登录超时，请重新登录！")) {
            log.error("#93 responseBody=" + responseBody);
            return;
        }

        Document document = Jsoup.parse(responseBody);
        Elements trs = document.select("tr");

        List<Row> rows = this.rows(trs);
        if (null == rows || rows.isEmpty()) {
            log.error("#98 rows=" + rows);
            return;
        }

        List<Row> rowsNew = new ArrayList<Row>();
        // 每行一条记录
        for (Row row : rows) {
            DataEntry dataEntry = new DataEntryPojo() //
                    .setRows(ListUtil.toList(row));
            List<DataEntry> dataEntries = ListUtil.toList(dataEntry);

            DataMessagePojo dataMessage = new DataMessagePojo();
            dataMessage.setDataNodeId(this.getDataNode().getId());
            dataMessage.setDataEntries(dataEntries);

            if (this.isNew(row)) {
                rowsNew.add(row);
            }
        }

        if (rows.size() == rowsNew.size()) {
            log.warn("#114 全都是新的,需要往下翻页");
        }

        this.consumeRows(rowsNew);
    }

    private void consumeRows(List<Row> rows) {
        for (Row row : rows) {
            DataEntry dataEntry = new DataEntryPojo() //
                    .setExecuteTime(System.currentTimeMillis())//
                    .setEventType(DataEntry.EventType.INSERT) //
                    .setSchemaName("lakala") //
                    .setTableName("call_record") //
                    .setRows(ListUtil.toList(row));
            List<DataEntry> dataEntries = ListUtil.toList(dataEntry);
            DataMessage dataMessageNew = new DataMessagePojo(null, this.getDataNode().getId(), dataEntries);
            dataConsumer.consume(dataMessageNew);
        }
    }

    private boolean isNew(Row row) {
        String rowsJson = DataEntryUtil.rowsToJson(ListUtil.toList(row));
        EntryBuffer entryBuffer = new EntryBuffer().setRows(rowsJson);
        Integer countByExample = entryBufferService.countByExample(entryBuffer);
        return countByExample < 1;
    }

    /**
     * 一页记录
     */
    private List<Row> rows(Elements trs) {
        List<Row> rows = new ArrayList<Row>();

        for (Element tr : trs) {
            Elements tds = tr.select("td");
            if (tds.size() < 12) {
                continue;
            }

            Row row = this.row(tr);

            rows.add(row);
        }

        return rows;
    }

    /**
     * 一行记录
     */
    private Row row(Element tr) {
        List<Column> columns = new ArrayList<Column>();

        String staff = tr.select("td").get(0).text();
        columns.add(new Column().setName("staff").setValueAfter(staff));

        String startAt = tr.select("td").get(1).text();
        columns.add(new Column().setName("startAt").setValueAfter(startAt));

        String endAt = tr.select("td").get(2).text();
        columns.add(new Column().setName("endAt").setValueAfter(endAt));

        String sourceNumber = tr.select("td").get(3).text();
        columns.add(new Column().setName("sourceNumber").setValueAfter(sourceNumber));

        String destinationNumber = tr.select("td").get(4).text();
        columns.add(new Column().setName("destinationNumber").setValueAfter(destinationNumber));

        String customerName = tr.select("td").get(5).text();
        columns.add(new Column().setName("customerName").setValueAfter(customerName));

        String duration = tr.select("td").get(6).text();
        columns.add(new Column().setName("duration").setValueAfter(duration));

        String fee = tr.select("td").get(7).text();
        columns.add(new Column().setName("fee").setValueAfter(fee));

        String direction = tr.select("td").get(8).text();
        columns.add(new Column().setName("direction").setValueAfter(direction));

        String distance = tr.select("td").get(9).text();
        columns.add(new Column().setName("distance").setValueAfter(distance));

        String disposition = tr.select("td").get(10).text();
        columns.add(new Column().setName("disposition").setValueAfter(disposition));

        Elements input = tr.select("input[value='下载录音']");
        String onclick = input.attr("onclick");
        String downUrl = "http://61.155.215.234:8081/call400_admin/" + onclick.substring(19, onclick.length() - 1);
        columns.add(new Column().setName("downUrl").setValueAfter(downUrl));

        return new Row() //
                .setColumns(columns);
    }
}